Prophet forecaster (wrapper + from-scratch)

Prophet forecaster (wrapper + from-scratch)#

Goals#

  • Implement a Prophet(...) forecaster interface that can:

    • wrap the prophet Python package (Meta/Facebook Prophet) when available

    • fall back to a lightweight Prophet-like implementation from scratch (NumPy)

  • Understand Prophet’s core additive structure, changepoints, and Fourier seasonality.

Prerequisites#

  • Required: numpy, pandas, plotly

  • Optional: prophet (the official implementation)

If you want the wrapper to use the official library:

pip install prophet

Note: the official Prophet stack may download/compile Stan tooling depending on your setup.

Prophet model (high level)#

Prophet is a decomposable additive model:

\[ y(t) = g(t) + s(t) + h(t) + \varepsilon_t \]
  • \(g(t)\): trend (often piecewise linear with changepoints, or logistic growth)

  • \(s(t)\): seasonality (typically Fourier series for weekly/yearly patterns)

  • \(h(t)\): holiday / event effects (indicator features)

  • \(\varepsilon_t\): noise

This notebook implements a piecewise linear trend + Fourier seasonality baseline.

import numpy as np
import pandas as pd

import plotly.graph_objects as go
from plotly.subplots import make_subplots
import os
import plotly.io as pio

pio.renderers.default = os.environ.get("PLOTLY_RENDERER", "notebook")
pio.templates.default = "plotly_white"

np.set_printoptions(precision=4, suppress=True)
def _as_prophet_df(y: pd.Series | pd.DataFrame, *, y_col: str = "y") -> pd.DataFrame:
    """Coerce input into Prophet-style DataFrame with columns: ds (datetime), y (float)."""
    if isinstance(y, pd.Series):
        if y.index is None:
            raise ValueError("If y is a Series, it must have a DatetimeIndex")
        df = pd.DataFrame({"ds": pd.to_datetime(y.index), "y": y.astype(float).to_numpy()})
    else:
        if "ds" not in y.columns:
            raise ValueError("If y is a DataFrame, it must contain a 'ds' column")
        if y_col not in y.columns:
            raise ValueError(f"If y is a DataFrame, it must contain a '{y_col}' column")
        df = y[["ds", y_col]].rename(columns={y_col: "y"}).copy()
        df["ds"] = pd.to_datetime(df["ds"])
        df["y"] = df["y"].astype(float)

    df = df.dropna(subset=["ds", "y"]).sort_values("ds").reset_index(drop=True)
    if df.empty:
        raise ValueError("No data after dropping NaNs")
    return df


def _infer_freq(ds: pd.Series) -> str:
    """Infer a pandas frequency string from a datetime series."""
    ds = pd.to_datetime(ds)
    inferred = pd.infer_freq(ds)
    if inferred is not None:
        return inferred

    deltas = ds.sort_values().diff().dropna()
    if deltas.empty:
        return "D"
    median = deltas.median()
    # Coarse mapping for common cases
    if median <= pd.Timedelta(hours=1):
        return "H"
    if median <= pd.Timedelta(days=1):
        return "D"
    if median <= pd.Timedelta(days=7):
        return "W"
    return "D"


def _future_ds(last_ds: pd.Timestamp, steps: int, freq: str) -> pd.DatetimeIndex:
    last_ds = pd.to_datetime(last_ds)
    if steps < 1:
        return pd.DatetimeIndex([])
    start = last_ds + pd.tseries.frequencies.to_offset(freq)
    return pd.date_range(start=start, periods=int(steps), freq=freq)
def fourier_features(t_days: np.ndarray, period_days: float, fourier_order: int) -> np.ndarray:
    """Fourier series features used by Prophet for seasonality.

    Returns shape (n, 2*fourier_order): [sin(2πnt/P), cos(2πnt/P)] for n=1..N.
    """
    t_days = np.asarray(t_days, dtype=float).reshape(-1)
    if fourier_order <= 0:
        return np.zeros((t_days.size, 0), dtype=float)
    n = np.arange(1, int(fourier_order) + 1, dtype=float)
    x = 2.0 * np.pi * (t_days[:, None] * n[None, :] / float(period_days))
    return np.concatenate([np.sin(x), np.cos(x)], axis=1)


def changepoint_features(t_scaled: np.ndarray, changepoints_scaled: np.ndarray) -> np.ndarray:
    """Piecewise-linear basis: max(0, t - s_j) for each changepoint s_j."""
    t_scaled = np.asarray(t_scaled, dtype=float).reshape(-1)
    changepoints_scaled = np.asarray(changepoints_scaled, dtype=float).reshape(-1)
    if changepoints_scaled.size == 0:
        return np.zeros((t_scaled.size, 0), dtype=float)
    return np.maximum(0.0, t_scaled[:, None] - changepoints_scaled[None, :])


def ridge_fit(X: np.ndarray, y: np.ndarray, l2: np.ndarray) -> np.ndarray:
    """Solve (X^T X + diag(l2)) beta = X^T y."""
    X = np.asarray(X, dtype=float)
    y = np.asarray(y, dtype=float).reshape(-1)
    l2 = np.asarray(l2, dtype=float).reshape(-1)
    if l2.size != X.shape[1]:
        raise ValueError("l2 must have same length as number of columns in X")

    A = X.T @ X + np.diag(l2)
    b = X.T @ y
    return np.linalg.solve(A, b)
class ProphetLike:
    """A lightweight Prophet-like forecaster (trend + Fourier seasonality).

    This is NOT the full Bayesian Stan-based Prophet implementation.
    It approximates Prophet's core idea using regularized linear regression.
    """

    def __init__(
        self,
        *,
        freq: str | None = None,
        n_changepoints: int = 25,
        changepoint_range: float = 0.8,
        yearly_seasonality: int | bool = 10,
        weekly_seasonality: int | bool = 3,
        daily_seasonality: int | bool = False,
        add_seasonality: list[dict] | None = None,
        lambda_changepoints: float = 10.0,
        lambda_seasonality: float = 1.0,
    ):
        self.freq = freq
        self.n_changepoints = int(n_changepoints)
        self.changepoint_range = float(changepoint_range)
        self.yearly_seasonality = yearly_seasonality
        self.weekly_seasonality = weekly_seasonality
        self.daily_seasonality = daily_seasonality
        self.add_seasonality = add_seasonality or []
        self.lambda_changepoints = float(lambda_changepoints)
        self.lambda_seasonality = float(lambda_seasonality)

        if not (0 < self.changepoint_range <= 1):
            raise ValueError("changepoint_range must be in (0, 1]")

    @staticmethod
    def _seasonality_order(value: int | bool, default_if_true: int) -> int:
        if value is False or value is None:
            return 0
        if value is True:
            return int(default_if_true)
        return int(value)

    def _build_matrix(self, ds: pd.Series, changepoints_scaled: np.ndarray):
        ds = pd.to_datetime(ds)
        t_days = (ds - self.ds0_).dt.total_seconds().to_numpy() / 86400.0
        t_scaled = t_days / self.t_scale_

        # Trend basis: intercept + time + changepoint hinge functions
        A = changepoint_features(t_scaled, changepoints_scaled)
        X_trend = np.column_stack([np.ones(ds.size), t_scaled, A])

        # Seasonality basis
        X_season_parts = []
        season_names = []

        yearly_order = self._seasonality_order(self.yearly_seasonality, default_if_true=10)
        weekly_order = self._seasonality_order(self.weekly_seasonality, default_if_true=3)
        daily_order = self._seasonality_order(self.daily_seasonality, default_if_true=4)

        if yearly_order > 0:
            X_season_parts.append(fourier_features(t_days, period_days=365.25, fourier_order=yearly_order))
            season_names.append("yearly")
        if weekly_order > 0:
            X_season_parts.append(fourier_features(t_days, period_days=7.0, fourier_order=weekly_order))
            season_names.append("weekly")
        if daily_order > 0:
            X_season_parts.append(fourier_features(t_days, period_days=1.0, fourier_order=daily_order))
            season_names.append("daily")

        for spec in self.add_seasonality:
            name = spec["name"]
            period = float(spec["period"])
            order = int(spec["fourier_order"])
            X_season_parts.append(fourier_features(t_days, period_days=period, fourier_order=order))
            season_names.append(name)

        X_season = np.concatenate(X_season_parts, axis=1) if X_season_parts else np.zeros((ds.size, 0))
        return t_days, t_scaled, X_trend, X_season, season_names

    def fit(self, y: pd.Series | pd.DataFrame):
        df = _as_prophet_df(y)
        self.freq_ = self.freq or _infer_freq(df["ds"])
        self.ds0_ = pd.to_datetime(df["ds"].iloc[0])

        t_days = (df["ds"] - self.ds0_).dt.total_seconds().to_numpy() / 86400.0
        self.t_scale_ = float(np.max(t_days)) if float(np.max(t_days)) > 0 else 1.0
        t_scaled = t_days / self.t_scale_

        # Choose changepoints inside the first `changepoint_range` portion of the history
        n = df.shape[0]
        cp_max = float(np.quantile(t_scaled, self.changepoint_range))
        candidate = t_scaled[t_scaled <= cp_max]
        cp_count = max(0, min(self.n_changepoints, candidate.size - 2))
        if cp_count == 0:
            self.changepoints_scaled_ = np.array([], dtype=float)
        else:
            qs = np.linspace(0, 1, cp_count + 2)[1:-1]
            self.changepoints_scaled_ = np.quantile(candidate, qs)

        _, _, X_trend, X_season, season_names = self._build_matrix(df["ds"], self.changepoints_scaled_)
        self.season_names_ = season_names
        self.X_trend_cols_ = X_trend.shape[1]

        X = np.concatenate([X_trend, X_season], axis=1)

        # Regularization (MAP with Gaussian priors)
        l2 = np.zeros(X.shape[1], dtype=float)
        # No penalty on intercept and base slope
        # Penalty on changepoint hinge terms
        if X_trend.shape[1] > 2:
            l2[2 : X_trend.shape[1]] = self.lambda_changepoints
        # Penalty on seasonality coefficients
        if X_season.shape[1] > 0:
            l2[X_trend.shape[1] :] = self.lambda_seasonality

        self.coef_ = ridge_fit(X, df["y"].to_numpy(), l2)
        self.train_ = df
        self.fitted_ = self._predict_df(df["ds"], include_components=True)
        return self

    def _predict_df(self, ds: pd.Series, *, include_components: bool) -> pd.DataFrame:
        ds = pd.to_datetime(ds)
        t_days, _, X_trend, X_season, _ = self._build_matrix(ds, self.changepoints_scaled_)
        X = np.concatenate([X_trend, X_season], axis=1)

        yhat = X @ self.coef_
        out = pd.DataFrame({"ds": ds, "yhat": yhat})

        if include_components:
            coef_trend = self.coef_[: X_trend.shape[1]]
            coef_season = self.coef_[X_trend.shape[1] :]
            out["trend"] = X_trend @ coef_trend
            out["seasonality"] = (X_season @ coef_season) if X_season.shape[1] > 0 else 0.0
            out["t_days"] = t_days

        return out

    def predict(self, steps: int = 0, *, ds: pd.Series | None = None) -> pd.DataFrame:
        if not hasattr(self, "train_"):
            raise RuntimeError("Call fit() first")
        if ds is None:
            last_ds = pd.to_datetime(self.train_["ds"].iloc[-1])
            ds = _future_ds(last_ds, int(steps), self.freq_)
        return self._predict_df(pd.Series(ds), include_components=True)
class Prophet:
    """Prophet forecaster interface.

    - backend='auto': use `prophet` package if installed, else fallback to ProphetLike
    - backend='prophet': require `prophet` package
    - backend='from_scratch': force the lightweight ProphetLike implementation

    Parameters shown here are intentionally minimal; extend as needed.
    """

    def __init__(
        self,
        *,
        freq: str | None = None,
        add_seasonality: list[dict] | None = None,
        backend: str = "auto",
        # common Prophet-ish knobs
        n_changepoints: int = 25,
        changepoint_range: float = 0.8,
        yearly_seasonality: int | bool = 10,
        weekly_seasonality: int | bool = 3,
        daily_seasonality: int | bool = False,
        # regularization for fallback model
        lambda_changepoints: float = 10.0,
        lambda_seasonality: float = 1.0,
        # kwargs passed to official `prophet.Prophet(...)` if available
        prophet_kwargs: dict | None = None,
        add_country_holidays: str | None = None,
    ):
        self.freq = freq
        self.add_seasonality = add_seasonality or []
        self.backend = backend
        self.n_changepoints = int(n_changepoints)
        self.changepoint_range = float(changepoint_range)
        self.yearly_seasonality = yearly_seasonality
        self.weekly_seasonality = weekly_seasonality
        self.daily_seasonality = daily_seasonality
        self.lambda_changepoints = float(lambda_changepoints)
        self.lambda_seasonality = float(lambda_seasonality)
        self.prophet_kwargs = prophet_kwargs or {}
        self.add_country_holidays = add_country_holidays

    @staticmethod
    def _prophet_available() -> bool:
        try:
            import prophet  # noqa: F401

            return True
        except Exception:
            return False

    def fit(self, y: pd.Series | pd.DataFrame):
        if self.backend not in {"auto", "prophet", "from_scratch"}:
            raise ValueError("backend must be one of: auto, prophet, from_scratch")

        if self.backend in {"auto", "prophet"} and self._prophet_available():
            from prophet import Prophet as ProphetModel

            df = _as_prophet_df(y)
            self.freq_ = self.freq or _infer_freq(df["ds"])

            model = ProphetModel(
                yearly_seasonality=self.yearly_seasonality,
                weekly_seasonality=self.weekly_seasonality,
                daily_seasonality=self.daily_seasonality,
                n_changepoints=self.n_changepoints,
                changepoint_range=self.changepoint_range,
                **self.prophet_kwargs,
            )

            for spec in self.add_seasonality:
                model.add_seasonality(**spec)
            if self.add_country_holidays is not None:
                model.add_country_holidays(country_name=self.add_country_holidays)

            model.fit(df)

            self._impl = model
            self._train_df = df
            self._backend_ = "prophet"
            return self

        if self.backend == "prophet":
            raise ModuleNotFoundError(
                "`prophet` is not installed. Install it (e.g. `pip install prophet`) or use backend='from_scratch'."
            )

        self._impl = ProphetLike(
            freq=self.freq,
            n_changepoints=self.n_changepoints,
            changepoint_range=self.changepoint_range,
            yearly_seasonality=self.yearly_seasonality,
            weekly_seasonality=self.weekly_seasonality,
            daily_seasonality=self.daily_seasonality,
            add_seasonality=self.add_seasonality,
            lambda_changepoints=self.lambda_changepoints,
            lambda_seasonality=self.lambda_seasonality,
        ).fit(y)
        self._backend_ = "from_scratch"
        return self

    def predict(self, steps: int = 0, *, ds: pd.Series | None = None) -> pd.DataFrame:
        if not hasattr(self, "_impl"):
            raise RuntimeError("Call fit() first")

        if self._backend_ == "prophet":
            if ds is None:
                last_ds = pd.to_datetime(self._train_df["ds"].iloc[-1])
                ds = _future_ds(last_ds, int(steps), self.freq_)
            future = pd.DataFrame({"ds": pd.to_datetime(ds)})
            fcst = self._impl.predict(future)
            keep = [c for c in ["ds", "yhat", "yhat_lower", "yhat_upper"] if c in fcst.columns]
            return fcst[keep].copy()

        return self._impl.predict(steps=steps, ds=ds)
# Create a synthetic series with: (1) trend + changepoint, (2) weekly + yearly seasonality
rng = np.random.default_rng(0)
ds = pd.date_range("2018-01-01", periods=900, freq="D")
t = np.arange(ds.size, dtype=float)

# Piecewise linear trend
cp = 450
trend = 0.02 * t + 2.0
trend[cp:] += 0.04 * (t[cp:] - cp)  # slope increases after changepoint

# Seasonalities
weekly = 1.5 * np.sin(2 * np.pi * t / 7.0)
yearly = 3.0 * np.sin(2 * np.pi * t / 365.25)

noise = rng.normal(0.0, 0.8, size=ds.size)
y = trend + weekly + yearly + noise

y_series = pd.Series(y, index=ds, name="y")
y_series.head()
2018-01-01    2.100584
2018-01-02    3.138668
2018-01-03    4.117924
2018-01-04    2.949499
2018-01-05    1.206905
Freq: D, Name: y, dtype: float64
# Fit our Prophet interface (will fallback to from-scratch in this environment)
model = Prophet(
    freq="D",
    backend="from_scratch",
    n_changepoints=30,
    changepoint_range=0.9,
    yearly_seasonality=10,
    weekly_seasonality=3,
    lambda_changepoints=50.0,
    lambda_seasonality=1.0,
).fit(y_series)

pred_future = model.predict(steps=120)
pred_future.head()
ds yhat trend seasonality t_days
0 2020-06-19 36.284578 36.113908 0.170669 900.0
1 2020-06-20 35.379175 36.167341 -0.788166 901.0
2 2020-06-21 35.735823 36.220774 -0.484951 902.0
3 2020-06-22 36.761379 36.274206 0.487173 903.0
4 2020-06-23 37.964227 36.327639 1.636588 904.0
# Build a combined frame for plotting
fitted = model._impl.fitted_.copy() if getattr(model, "_backend_", None) == "from_scratch" else None

train_df = _as_prophet_df(y_series)
train_df = train_df.rename(columns={"y": "y"})

fig = go.Figure()
fig.add_trace(go.Scatter(x=train_df["ds"], y=train_df["y"], mode="lines", name="observed"))

if fitted is not None:
    fig.add_trace(go.Scatter(x=fitted["ds"], y=fitted["yhat"], mode="lines", name="fitted", line=dict(width=2)))

fig.add_trace(go.Scatter(x=pred_future["ds"], y=pred_future["yhat"], mode="lines", name="forecast", line=dict(width=2)))

# Show changepoint locations (from-scratch backend)
if fitted is not None and getattr(model._impl, "changepoints_scaled_", None) is not None:
    cp_scaled = model._impl.changepoints_scaled_
    if cp_scaled.size > 0:
        cp_days = cp_scaled * model._impl.t_scale_
        cp_dates = model._impl.ds0_ + pd.to_timedelta(cp_days, unit="D")
        for d in cp_dates:
            fig.add_vline(x=d, line_width=1, line_dash="dot", line_color="gray", opacity=0.4)

fig.update_layout(
    title="Prophet-like forecast (trend + Fourier seasonality)",
    xaxis_title="date",
    yaxis_title="y",
    height=450,
)
fig.show()
/home/tempa/miniconda3/lib/python3.12/site-packages/plotly/io/_json.py:558: UserWarning:

Discarding nonzero nanoseconds in conversion.
# Component view (from-scratch backend)
if fitted is None:
    raise RuntimeError("Component plot is implemented for the from_scratch backend in this notebook")

fig = make_subplots(rows=2, cols=1, shared_xaxes=True, vertical_spacing=0.08, subplot_titles=("Trend", "Seasonality"))
fig.add_trace(go.Scatter(x=fitted["ds"], y=fitted["trend"], mode="lines", name="trend"), row=1, col=1)
fig.add_trace(go.Scatter(x=fitted["ds"], y=fitted["seasonality"], mode="lines", name="seasonality"), row=2, col=1)

fig.update_yaxes(title_text="trend", row=1, col=1)
fig.update_yaxes(title_text="seasonality", row=2, col=1)
fig.update_xaxes(title_text="date", row=2, col=1)
fig.update_layout(height=550, showlegend=False)
fig.show()

Notes (wrapper vs from-scratch)#

  • The official Prophet model is fitted using a Stan backend (MAP or MCMC) with priors chosen to regularize changepoints and seasonalities.

  • The from-scratch ProphetLike here uses ridge regression on:

    • trend basis \([1, t, \max(0,t-s_1), \ldots, \max(0,t-s_K)]\)

    • Fourier seasonal basis for weekly/yearly (and any add_seasonality specs)

  • It is meant for intuition and learning, not as a full drop-in replacement (no holiday effects, no logistic growth, no calibrated uncertainty intervals).